/*
 * Copyright 2018 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.google.cloud.bigtable.data.v2.stub.readrows;

import com.google.api.core.InternalApi;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.StreamController;
import com.google.bigtable.v2.ReadRowsRequest;
import com.google.cloud.bigtable.data.v2.models.RowAdapter;

/**
 * Remove the special marker rows generated by {@link RowMergingCallable}.
 *
 * <p>This class is considered an internal implementation detail and not meant to be used by
 * applications.
 */
@InternalApi
public class FilterMarkerRowsCallable<RowT> extends ServerStreamingCallable<ReadRowsRequest, RowT> {
  private final ServerStreamingCallable<ReadRowsRequest, RowT> innerCallable;
  private final RowAdapter<RowT> rowAdapter;

  public FilterMarkerRowsCallable(
      ServerStreamingCallable<ReadRowsRequest, RowT> inner, RowAdapter<RowT> rowAdapter) {
    this.rowAdapter = rowAdapter;
    this.innerCallable = inner;
  }

  @Override
  public void call(
      ReadRowsRequest request, ResponseObserver<RowT> responseObserver, ApiCallContext context) {
    FilteringResponseObserver innerObserver = new FilteringResponseObserver(responseObserver);
    innerCallable.call(request, innerObserver, context);
  }

  private class FilteringResponseObserver implements ResponseObserver<RowT> {
    private final ResponseObserver<RowT> outerObserver;
    private StreamController innerController;
    private boolean autoFlowControl = true;

    FilteringResponseObserver(ResponseObserver<RowT> outerObserver) {
      this.outerObserver = outerObserver;
    }

    @Override
    public void onStart(final StreamController controller) {
      innerController = controller;

      outerObserver.onStart(
          new StreamController() {
            @Override
            public void cancel() {
              controller.cancel();
            }

            @Override
            public void disableAutoInboundFlowControl() {
              autoFlowControl = false;
              controller.disableAutoInboundFlowControl();
            }

            @Override
            public void request(int count) {
              controller.request(count);
            }
          });
    }

    @Override
    public void onResponse(RowT response) {
      if (rowAdapter.isScanMarkerRow(response)) {
        if (!autoFlowControl) {
          innerController.request(1);
        }
      } else {
        outerObserver.onResponse(response);
      }
    }

    @Override
    public void onError(Throwable t) {
      outerObserver.onError(t);
    }

    @Override
    public void onComplete() {
      outerObserver.onComplete();
    }
  }
}
